跳到主要内容

Kafka 的基本概念和结构

Kafka 官方文档

涉及的角色

Producer:消息生产者,将消息 push 到 Kafka 集群中的 Broker。

Consumer:消息消费者,从 Kafka 集群中 pull 消息,消费消息。

Consumer Group:消费者组,由一到多个 Consumer 组成,每个 Consumer 都属于一个 Consumer Group。

  • 消费者组在逻辑上是同一个订阅者。
  • 消费者组内每个消费者负责消费不同分区(Partition)的数据,一个分区只能由一个组内消费者消费;
  • 消费者组之间互不影响。即每条消息只能被 Consumer Group 中的一个 Consumer 消费;
  • 但是可以被多个 Consumer Group 组消费。这样就实现了单播和多播。

Broker:一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。

Partition 分区

Topic 和 Partition 的关系

一个 Topic 可以认为是一类信息,逻辑上的队列,每条消息都要指定 Topic。为了使得 Kafka 的吞吐量可以线性提高(负载均衡与扩展性考虑),物理上将 Topic 分成一个或多个 Partition。

每个 Partition 在存储层面是 append log 文件,消息 push 进来后,会被追加到 log 文件的尾部,每条消息在文件中的位置成为 offset(偏移量),offset 是一个 long 型数字,唯一的标识一条信息。

kafka 的存储文件都是按照 offset.kafka 来命名,用 offset 做名字的好处是方便查找。例如你想找位于 2049 的位置,只要找到 2048.kafka 的文件即可。当然 the first offset 就是 00000000000.kafka

因为每条消息都追加到 Partition 的尾部,所以属于磁盘的顺序写,效率很高。如图:

Kafka 为一个 Partition 生成多个副本,并且把它们分散在不同的 Broker。如果一个 Broker 故障了,Consumer 可以在其他 Broker 上找到 Partition 的副本,继续获取消息。

Offsets 和消息的顺序

Partition 中的每条记录都会被分配一个唯一的序号,称为 Offset(偏移量)。Offset 是一个递增的、不可变的数字,由 Kafka 自动维护。当一条记录写入 Partition 的时候,它就被追加到 log 文件的末尾,并被分配一个序号,作为 Offset。(注意!!分区里面的内容是不同的,下面的图的编号指的是当前分区内的偏移量,实际上它的概念有点像负载均衡那样)

如上图,这个 Topic 有 3 个 Partition 分区,向 Topic 发送消息的时候,实际上是被写入某一个 Partition,并赋予 Offset。

消息的顺序性需要注意,一个 Topic 如果有多个 Partition 的话,那么从 Topic 这个层面来看,消息是无序的。

但单独看 Partition 的话,Partition 内部消息是有序的。

所以,一个 Partition 内部消息有序,一个 Topic 跨 Partition 是无序的。

如果强制要求 Topic 整体有序,就只能让 Topic 只有一个 Partition。

写入 Partition ⭐

一个 Topic 有多个 Partition,那么,向一个 Topic 中发送消息的时候,具体是写入哪个 Partition 呢? 有 3 种写入方式。

1、使用 Partition Key 写入特定 Partition

Producer 发送消息的时候,可以指定一个 Partition Key,这样就可以写入特定 Partition 了。Partition Key 可以使用任意值,例如设备ID、User ID。Partition Key 会传递给一个 Hash 函数,由计算结果决定写入哪个 Partition。所以,有相同 Partition Key 的消息,会被放到相同的 Partition。

例如使用 User ID 作为 Partition Key,那么此 ID 的消息就都在同一个 Partition,这样可以保证此类消息的有序性。

这种方式需要注意 Partition 热点问题。例如使用 User ID 作为 Partition Key,如果某一个 User 产生的消息特别多,是一个头部活跃用户,那么此用户的消息都进入同一个 Partition 就会产生热点问题,导致某个 Partition 极其繁忙。

2、由 kafka 决定

如果没有使用 Partition Key,Kafka 就会使用轮询的方式来决定写入哪个 Partition。

这样,消息会均衡的写入各个 Partition。

但这样无法确保消息的有序性。

3、自定义规则

Kafka 支持自定义规则,一个 Producer 可以使用自己的分区指定规则。

读取 Partition

Kafka 不像普通消息队列具有发布/订阅功能,Kafka 不会向 Consumer 推送消息。 Consumer 必须自己从 Topic 的 Partition 拉取消息。 一个 Consumer 连接到一个 Broker 的 Partition,从中依次读取消息。

消息的 Offset 就是 Consumer 的游标,根据 Offset 来记录消息的消费情况。

读完一条消息之后,Consumer 会推进到 Partition 中的下一个 Offset,继续读取消息。 Offset 的推进和记录都是 Consumer 的责任,Kafka 是不管的。

不过不用担心,一般这个 Offset 是持久化保存的,所以不用担心服务挂了就没了

Consumer Group(消费组)

Kafka 中有一个 Consumer Group(消费组)的概念,多个 Consumer 组团去消费一个 Topic。同组的 Consumer 有相同的 Group ID。Consumer Group 机制会保障一条消息只被组内唯一一个 Consumer 消费,不会重复消费。

Consumer Group:消费者组,由一到多个 Consumer 组成,每个 Consumer 都属于一个 Consumer Group。

  • 消费者组在逻辑上是同一个订阅者。
  • 消费者组内每个消费者负责消费不同分区(Partition)的数据,一个分区只能由一个组内消费者消费;
  • 消费者组之间互不影响。即每条消息只能被 Consumer Group 中的一个 Consumer 消费;
  • 但是可以被多个 Consumer Group 组消费。这样就实现了单播和多播。

例如一个 Topic 有 3 个 Partition,你有 4 个 Consumer 负责这个 Topic,也只会有 Consumer 工作,另一个作为后补队员,当某个 Consumer 故障了,它再补上去,是一种很好的容错机制。

消费组这种方式可以让多个 Partition 并行消费,大大提高了消息的消费能力,最大并行度为 Topic 的 Partition 数量。

多副本(Replica)机制

Kafka 为分区(Partition)引入了多副本(Replica)机制。分区中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。

  • Kafka 通过给特定 Topic 指定多个 Partition,而各个 Partition 可以分布在不同的 Broker 上,提供比较好的并发能力(负载均衡)。
  • Partition 可以指定对应的 Replica 数,这也极大地提高了消息存储的安全性,提高了容灾能力,不过也相应的增加了所需要的存储空间。

生产者和消费者只与 leader 副本交互。可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。当 leader 副本发生故障时会从 follower 中选举出一个 leader,但是 follower 中如果有和 leader 同步程度达不到要求的参加不了 leader 的竞选。

pull 还是 push? ⭐

首先明确一下推拉模式到底是在讨论消息队列的哪一个步骤,一般而言我们 在谈论推拉模式的时候指的是 Comsumer 和 Broker 之间的交互。

默认的认为 Producer 与 Broker 之间就是推的方式,即 Producer 将消息推送给 Broker,而不是 Broker 主动去拉取消息。

RocketMQ 和 Kafka 都是利用“长轮询”来实现拉模式

推模式 push

推模式指的是消息从 Broker 推向 Consumer,即 Consumer 被动的接收消息,由 Broker 来主导消息的发送。

我们来想一下推模式有什么好处?

消息实时性高, Broker 接受完消息之后可以立马推送给 Consumer。对于消费者使用来说更简单,简单啊就等着,反正有消息来了就会推过来。

推模式有什么缺点?

推送速率难以适应消费速率,推模式的目标就是以最快的速度推送消息,当生产者往 Broker 发送消息的速率大于消费者消费消息的速率时,随着时间的增长消费者那边可能就“爆仓”了,因为根本消费不过来啊。当推送速率过快就像 DDos 攻击一样消费者就傻了。

并且不同的消费者的消费速率还不一样,身为 Broker 很难平衡每个消费者的推送速率,如果要实现自适应的推送速率那就需要在推送的时候消费者告诉 Broker ,我不行了你推慢点吧,然后 Broker 需要维护每个消费者的状态进行推送速率的变更。

这其实就增加了 Broker 自身的复杂度。

所以说推模式难以根据消费者的状态控制推送速率,适用于消息量不大、消费能力强要求实时性高的情况下。

拉模式 pull ⭐

拉模式指的是 Consumer 主动向 Broker 请求拉取消息,即 Broker 被动的发送消息给 Consumer。

我们来想一下拉模式有什么好处?

拉模式主动权就在消费者身上了,消费者可以根据自身的情况来发起拉取消息的请求。假设当前消费者觉得自己消费不过来了,它可以根据一定的策略停止拉取,或者间隔拉取都行。

拉模式下 Broker 就相对轻松了,它只管存生产者发来的消息,至于消费的时候自然由消费者主动发起,来一个请求就给它消息呗,从哪开始拿消息,拿多少消费者都告诉它,它就是一个没有感情的工具人,消费者要是没来取也不关它的事。

拉模式可以更合适的进行消息的批量发送,基于推模式可以来一个消息就推送,也可以缓存一些消息之后再推送,但是推送的时候其实不知道消费者到底能不能一次性处理这么多消息。而拉模式就更加合理,它可以参考消费者请求的信息来决定缓存多少消息之后批量发送。

拉模式有什么缺点?

消息延迟,毕竟是消费者去拉取消息,但是消费者怎么知道消息到了呢?所以它只能不断地拉取,但是又不能很频繁地请求,太频繁了就变成消费者在攻击 Broker 了。因此需要降低请求的频率,比如隔个 2 秒请求一次,你看着消息就很有可能延迟 2 秒了。

消息忙请求,忙请求就是比如消息隔了几个小时才有,那么在几个小时之内消费者的请求都是无效的,在做无用功。

Kafka 的消费确认机制 ACK

Kafka 的 ACK 机制,指的是 producer 的消息发送确认机制,这直接影响到 Kafka 集群的吞吐量和消息可靠性。而吞吐量和可靠性就像硬币的两面,两者不可兼得,只能平衡。

ack 有 3 个可选值,分别是 0,1,-1。默认是 1。

const (
// 不发送响应
NoResponse RequiredAcks = 0
// 等待随便一个分区副本响应
WaitForLocal RequiredAcks = 1
// 等待全部分区响应
WaitForAll RequiredAcks = -1
)

ack = 0 意味着 producer 不等待 broker 同步完成的确认,继续发送下一条(批)信息提供了最低的延迟。但是最弱的持久性,当服务器发生故障时,就很可能发生数据丢失。例如 leader 已经死亡,producer 不知情,还会继续发送消息 broker 接收不到数据就会数据丢失

ack = 1,简单来说就是,producer 只要收到一个分区副本成功写入的通知就认为推送消息成功了。这里有一个地方需要注意,这个副本必须是 leader 副本。只有 leader 副本成功写入了,producer 才会认为消息发送成功。

ack = -1,producer 只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。

ZooKeeper 与 Kafka 的关系

注意,新版是不用 ZooKeeper 的,Kafka 自己实现了这些功能

ZooKeeper 是一项集中式服务,用于维护配置信息,命名,提供分布式同步和提供组服务。

当我们编写程序的时候,通常会将所有的配置信息保存在一个配置文件中,例如账号、密码等信息,后续直接修改配置文件就行了,那分布式场景下如何配置呢?如果说每台机器上都保存一个配置文件,这时候要一台台的去修改配置文件难免出错,而且要管理这些机器也会变得复杂和困难,ZooKeeper 的出现就是为了解决这类问题,实现高度可靠的分布式系统。

ZooKeeper 本质上是一个分布式的小文件存储系统。提供基于类似于文件系统的目录树方式的数据存储,并且可以对树种 的节点进行有效管理。从而来维护和监控你存储的数据的状态变化。将通过监控这些数据状态的变化,从而可以达到基于数据的集群管理。

为什么配置 Kafka 需要依赖 Zookeeper 呢?

ZooKeeper 作为给分布式系统提供协调服务的工具被 kafka 所依赖。在分布式系统中,消费者需要知道有哪些生产者是可用的,而如果每次消费者都需要和生产者建立连接并测试是否成功连接,那效率也太低了,显然是不可取的。而通过使用 ZooKeeper 协调服务,Kafka 就能将 Producer,Consumer,Broker 等结合在一起,同时借助 ZooKeeper,Kafka 就能够将所有组件在无状态的条件下建立起生产者和消费者的订阅关系,实现负载均衡。

Broker 信息:在 ZooKeeper 上会有一个专门用来进行 Broker 服务器列表记录的节点,节点路径为 /brokers/ids。Kafka 的每个 Broker 启动时,都会在 ZooKeeper 中注册,创建 /brokers/ids/[0-N] 节点,写入 IP,端口等信息,每个 Broker 都有一个 BrokerId。Broker 创建的是临时节点,在连接断开时节点就会自动删除,所以在 ZooKeeper 上就可以通过 Broker 中节点的变化来得到 Broker 的可用性。

Topic 信息:在 Kafka 中可以定义很多个 Topic,每个 Topic 又被分为很多个 Partition。一般情况下,每个 Partition 独立在存在一个 Broker 上,所有的这些 Topic 和 Broker 的对应关系都由 ZooKeeper 进行维护。

负载均衡:生产者需要将消息发送给 Broker,消费者需要从 Broker 上获取消息,通过使用 ZooKeeper,就都能监听 Broker 上节点的状态信息,从而实现动态负载均衡。

offset 信息:offset 用于记录消费者消费到的位置,在老版本(0.9以前)里 offset 是保存在 ZooKeeper 中的。

Controller 选举

在 Kafka 中会有多个 Broker,其中一个 Broker 会被选举成为 Controller(控制器),在任意时刻,Kafka 集群中有且仅有一个控制器。

Controller 负责管理集群中所有分区和副本的状态,当某个分区的 leader 副本出现故障时,由 Controller 为该分区选举出一个新的 leader。Kafka 的 Controller 选举就依靠 ZooKeeper 来完成,成功竞选为 Controller 的 Broker 会在 ZooKeeper 中创建 /controller 这个临时节点,在 ZooKeeper 中使用 get 命令查看节点内容:

其中 “version” 在目前版本中固定为 1,“brokerid” 表示 Broker 的编号,“timestamp” 表示竞选称为 Controller 时的时间戳。

当 Broker 启动时,会尝试读取 /controller 中的 “brokerid”,如果读取到的值不是-1,则表示已经有节点竞选成为 Controller 了,当前节点就会放弃竞选;而如果读取到的值为-1,ZooKeeper 就会尝试创建 /controller 节点,当该 Broker 去创建的时候,可能还有其他 Broker 一起同时创建节点,但只有一个 Broker 能够创建成功,即成为唯一的 Controller。

Reference

消息队列之推还是拉,RocketMQ 和 Kafka 是如何做的? 细说 Kafka Partition 分区 【kafka】简述kafka的ack机制 docker 配置 kafka+zookeeper,golang操作kafka Kafka 入门(三)--为什么 Kafka 依赖 ZooKeeper? apache kafka系列之在zookeeper中存储结构